/**
 * Copyright (c) 2015, 玛雅牛［李飞］ (lifei@wellbole.com).
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.jfinal.plugin.zbus.proto;

import com.jfinal.log.Logger;
import com.jfinal.plugin.zbus.core.AbstractSender;
import com.jfinal.plugin.zbus.core.TMsgHandler;
import org.reflections.Reflections;
import org.zbus.mq.Protocol;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * @ClassName: ZbusQueue
 * @Description: AQueue主题对象
 * @since V1.0.0
 */
public class ZbusQueue {

    private static final Logger log = Logger.getLogger(ZbusQueue.class);

    public String mqName;
    public ReceiverQ receiverQ = new ReceiverQ();
    public SenderQ senderQ;

    //单独设置producer和sender的个数.
	public ZbusQueue init(String address,String mqName,Integer maxProducerNum,Integer maxSenderNum){
        this.mqName = mqName;
        this.senderQ = new SenderQ(address,maxProducerNum,maxSenderNum);
        return this;
    }

    //默认producer与sender一一对应
    public ZbusQueue init(String address ,String mqName, Integer maxNum){
        return init(address,mqName,maxNum,maxNum);
    }

    /**
     * @ClassName: RegQ
     * @Description: queue注解注册
     * @since V1.0.0
     */
	@Retention(RetentionPolicy.RUNTIME)
	@Target({ ElementType.TYPE })
	public @interface RegQ {
		String name();
		boolean enable() default true;
        int reloadNum() default 1;
	}

    /**
     * @ClassName: MqSender
     * @Description: Mq泛型发送器
     * @since V1.0.0
     */
    public class SenderQ<T> extends AbstractSender<T> {

        private final String address;
        private final Integer maxProducerNum;
        private final Integer maxSenderNum;

        /**
         * @ClassName: MqSender
         * @Description: 构建一个MQ发送器
         * @since V1.0.0
         */
        public SenderQ(String address,Integer maxProducerNum,Integer maxSenderNum) {
            this.address = address;
            this.maxProducerNum = maxProducerNum;
            this.maxSenderNum = maxSenderNum;
            init();
        }

        //默认只启动一个producer和一个sender
        public SenderQ() {
            this("127.0.0.1",1,1);
        }

        public SenderQ init(String mqName) {
            init(address, mqName, Protocol.MqMode.MQ, maxProducerNum, maxSenderNum);
            return this;
        }

        public SenderQ init() {
            return init(mqName);
        }

    }

    /**
     * MQ消费者配置Map
     */
//    public static Map<String, TMsgHandler<?>> tMsgHandlerQMap = new HashMap<String, TMsgHandler<?>>();
    public static Map<String, ReceiverQ> tMsgReceiverQMap = new HashMap<String, ReceiverQ>();

    public class ReceiverQ {

        public TMsgHandler<?> msgHandler;
        public Integer maxConsumerNum;
        /**
         * @Title: register
         * @Description: 注册消息回调接口
         *            消息到达回调接口
         * @since V1.0.0
         */
//        public ReceiverQ register(String mqName, TMsgHandler<?> msgHandler) {
//            if (tMsgHandlerQMap.containsKey(mqName)) {
//                log.warn("(mq=" + mqName + ")对应的消息处理器已存在!");
//            }
//            tMsgHandlerQMap.put(mqName, msgHandler);
//            return this;
//        }

        public ReceiverQ register(String mqName, Integer consumerNum,TMsgHandler<?> msgHandler) {
            if (tMsgReceiverQMap.containsKey(mqName)) {
                log.warn("(mq=" + mqName + ")对应的消息处理器已存在!");
            }
            this.msgHandler = msgHandler;
            maxConsumerNum = consumerNum;
            tMsgReceiverQMap.put(mqName, this);
            return this;
        }

//        public ReceiverQ register(TMsgHandler<?> msgHandler) {
//            return register(mqName,msgHandler);
//        }

        public ReceiverQ register(Integer consumerNum,TMsgHandler<?> msgHandler) {
            return register(mqName,consumerNum,msgHandler);
        }

        public ReceiverQ autoLoadQAnno(Reflections reflections){
            Set<Class<?>> queueClasses = reflections.getTypesAnnotatedWith(RegQ.class);
            for (Class<?> mc : queueClasses) {
                if(!TMsgHandler.class.isAssignableFrom(mc)){
                    throw new RuntimeException(mc.getName() + " 必须继承自 TMsgHandler<T>");
                }
                RegQ mh = mc.getAnnotation(RegQ.class);
                try {
                    if(mh.enable()) {
                        TMsgHandler<?> hander = (TMsgHandler<?>) mc.newInstance();
                        register(mh.name(),mh.reloadNum(),hander);
                        log.info("通过注解自动加载MQ消息处理器( mq=" + mh.name()  + ",handler=" + mc.getName() + " )");
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e.getMessage(),e);
                }
            }
            return this;
        }

    }


}
